package com.dahuan.sink;

import com.dahuan.bean.SensorReading;
import com.dahuan.source.MySensor;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class JDBC_Sink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        //数据源是自定义数据源
        DataStreamSource<SensorReading> addSource = env.addSource( new MySensor() );

        //数据落地
        addSource.addSink( new MyJDBC() );

        env.execute( "JDBC_Sink" );

    }

    public static class MyJDBC extends RichSinkFunction<SensorReading> {

        Connection conn = null;
        PreparedStatement insert = null;
        PreparedStatement update = null;

        //初始化
        @Override
        public void open(Configuration conf) throws Exception {
            conn = DriverManager.getConnection( "jdbc:mysql://localhost:3306/test", "root", "123" );
            insert = conn.prepareStatement( "insert into sensor_temp (id, temp) values (?, ?)" );
            update = conn.prepareStatement( "update sensor_temp set temp = ? where id = ?" );
        }

        //每来一条语句调用一条语句
        @Override
        public void invoke(SensorReading value, Context context) throws Exception {
            //先进行更新，如果没有，就直接插入
            update.setDouble( 1,value.getTemperature() );
            update.setString( 2,value.getId() );
            update.execute();
            // 判断是否有值
            if (update.getUpdateCount() == 0){
                insert.setDouble( 1,value.getTemperature() );
                insert.setString( 2,value.getId() );
                insert.execute();
            }
        }

        //关闭连接
        @Override
        public void close() throws Exception {
            conn.close();
            insert.close();
            update.close();
        }
    }
}
